-- enforce some defaults to be sure that the env settings will not affect the test
SET max_threads=5, async_socket_for_remote=1, prefer_localhost_replica=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread', remote_filesystem_read_method='read';

-- we use query_thread_log to check peak thread usage
-- after https://github.com/ClickHouse/ClickHouse/issues/53417 there is a simpler way to check it
-- but that will not allow to backport the test to older versions
SET log_query_threads=1;


--------------------
SELECT 'prefer_localhost_replica=1, remote query with a lot of union all' AS testname;

-- query with lot of dummy union all will create a lot of streams
-- let's check how many threads clickhouse will start for that

select count() from remote('127.0.0.1:9000', view(
{% for n in range(77) -%}
SELECT * FROM system.one {{ "UNION ALL" if not loop.last }}
{% endfor -%}
 )) SETTINGS log_comment='check_concurrency_in_remote_queries1';

SYSTEM FLUSH LOGS;

WITH
    maxIntersections(
        toUnixTimestamp64Micro(query_start_time_microseconds),
        toUnixTimestamp64Micro(event_time_microseconds)
    ) as peak_threads
SELECT
    if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result
FROM system.query_thread_log
WHERE
    event_time > now() - 60
    AND query_id = (
        SELECT query_id
        FROM system.query_log
        WHERE
            type = 'QueryFinish'
            AND event_time > now() - 60
            AND log_comment = 'check_concurrency_in_remote_queries1'
            AND current_database = currentDatabase()
            ORDER BY event_time DESC LIMIT 1
    );

--------------------
SELECT 'prefer_localhost_replica=0, remote query with a lot of union all' AS testname;

select count() from remote('127.0.0.1:9000', view(
{% for n in range(77) -%}
SELECT * FROM system.one {{ "UNION ALL" if not loop.last }}
{% endfor -%}
 )) SETTINGS log_comment='check_concurrency_in_remote_queries2', prefer_localhost_replica=0;

SYSTEM FLUSH LOGS;

WITH
    maxIntersections(
        toUnixTimestamp64Micro(query_start_time_microseconds),
        toUnixTimestamp64Micro(event_time_microseconds)
    ) as peak_threads
SELECT
    if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result
FROM system.query_thread_log
WHERE
    event_time > now() - 60
    AND query_id = (
        SELECT query_id
        FROM system.query_log
        WHERE
            type = 'QueryFinish'
            AND event_time > now() - 60
            AND log_comment = 'check_concurrency_in_remote_queries2'
            AND current_database = currentDatabase()
            ORDER BY event_time DESC LIMIT 1
    );

--------------------
SELECT 'prefer_localhost_replica=1, async_socket_for_remote=0, remote query with a lot of union all (lot of threads)' AS testname;

-- that is actually a bad behaviour, but it used to work like that for a long time.
-- now is happens only for async_socket_for_remote=0 (while it is 1 by default)
-- see https://github.com/ClickHouse/ClickHouse/issues/53287

select count() from remote('127.0.0.1:9000', view(
{% for n in range(77) -%}
SELECT * FROM system.one {{ "UNION ALL" if not loop.last }}
{% endfor -%}
 )) SETTINGS log_comment='check_concurrency_in_remote_queries3', async_socket_for_remote=0, prefer_localhost_replica=1;

SYSTEM FLUSH LOGS;

WITH
    maxIntersections(
        toUnixTimestamp64Micro(query_start_time_microseconds),
        toUnixTimestamp64Micro(event_time_microseconds)
    ) as peak_threads
SELECT
    if(peak_threads >= 77, 'ok', 'too few threads: ' || toString(peak_threads) ) AS result
FROM system.query_thread_log
WHERE
    event_time > now() - 60
    AND query_id = (
        SELECT query_id
        FROM system.query_log
        WHERE
            type = 'QueryFinish'
            AND event_time > now() - 60
            AND log_comment = 'check_concurrency_in_remote_queries3'
            AND current_database = currentDatabase()
            ORDER BY event_time DESC LIMIT 1
    );

-- less synthetic testcase from the issue https://github.com/ClickHouse/ClickHouse/issues/53287
-- it creates lot of streams because of many parts (one per part) + optimize_read_in_order=1 feature

SELECT 'prepare test schema' AS stage;

DROP TABLE IF EXISTS test_lot_of_parts_distributed;
DROP TABLE IF EXISTS test_lot_of_parts;

CREATE TABLE test_lot_of_parts
(
    `a` String,
    `b` LowCardinality(String),
    `c` DateTime64(3),
    `val` String,
)
ENGINE = MergeTree
ORDER BY (a, b, c)
SETTINGS parts_to_delay_insert=0;

CREATE TABLE test_lot_of_parts_distributed
(
    `a` String,
    `b` LowCardinality(String),
    `c` DateTime64(3),
    `val` String,
)
ENGINE = Distributed(test_shard_localhost, currentDatabase(), 'test_lot_of_parts', rand());

-- we need a lot of parts to make sure that we will have a lot of streams
SYSTEM STOP MERGES test_lot_of_parts;
INSERT INTO test_lot_of_parts (a, b, c, val)
   SELECT
     'foo' as a,
     'bar' as b,
     _CAST('1683504000', 'DateTime64') as c,
     'baz' as val
   FROM numbers_mt(95)
   SETTINGS max_block_size = 1, min_insert_block_size_bytes=1, min_insert_block_size_rows=1; --every row will be in separate part

select count() from system.parts where table = 'test_lot_of_parts' and active and database = currentDatabase();

SELECT 'prefer_localhost_replica=1, remote query with read in order' AS testname;

-- query which uses optimize_read_in_order=1
SELECT DISTINCT
    'val' AS fieldType,
    val AS value
FROM test_lot_of_parts_distributed
WHERE a = 'foo' AND value != '' AND positionCaseInsensitiveUTF8(value, 'baz') > 0 AND b = 'bar' AND c >= _CAST('1683504000', 'DateTime64')
ORDER BY c DESC
LIMIT 5
SETTINGS log_comment='check_concurrency_in_remote_queries4' FORMAT Null;

SYSTEM FLUSH LOGS;

WITH
    maxIntersections(
        toUnixTimestamp64Micro(query_start_time_microseconds),
        toUnixTimestamp64Micro(event_time_microseconds)
    ) as peak_threads
SELECT
    if(peak_threads BETWEEN 1 AND toUInt64(getSetting('max_threads')) + 2, 'ok', 'too many threads: ' || toString(peak_threads) ) AS result
FROM system.query_thread_log
WHERE
    event_time > now() - 60
    AND query_id = (
        SELECT query_id
        FROM system.query_log
        WHERE
            type = 'QueryFinish'
            AND event_time > now() - 60
            AND log_comment = 'check_concurrency_in_remote_queries4'
            AND current_database = currentDatabase()
            ORDER BY event_time DESC LIMIT 1
    );


SELECT 'prefer_localhost_replica=1 + async_socket_for_remote=0, remote query with read in order (lot of threads)' AS testname;

-- that is actually a bad behaviour, but it used to work like that for a long time.
-- now is happens only for async_socket_for_remote=0 (while it is 1 by default)

SELECT DISTINCT
    'val' AS fieldType,
    val AS value
FROM test_lot_of_parts_distributed
WHERE a = 'foo' AND value != '' AND positionCaseInsensitiveUTF8(value, 'baz') > 0 AND b = 'bar' AND c >= _CAST('1683504000', 'DateTime64')
ORDER BY c DESC
LIMIT 5
SETTINGS log_comment='check_concurrency_in_remote_queries5', async_socket_for_remote=0 FORMAT Null;

SYSTEM FLUSH LOGS;

WITH
    maxIntersections(
        toUnixTimestamp64Micro(query_start_time_microseconds),
        toUnixTimestamp64Micro(event_time_microseconds)
    ) as peak_threads
SELECT
    if(peak_threads >= 95, 'ok', 'too few threads: ' || toString(peak_threads) ) AS result
     -- we have 95 parts
FROM system.query_thread_log
WHERE
    event_time > now() - 60
    AND query_id = (
        SELECT query_id
        FROM system.query_log
        WHERE
            type = 'QueryFinish'
            AND event_time > now() - 60
            AND log_comment = 'check_concurrency_in_remote_queries5'
            AND current_database = currentDatabase()
            ORDER BY event_time DESC LIMIT 1
    );

DROP TABLE IF EXISTS test_lot_of_parts_distributed;
DROP TABLE IF EXISTS test_lot_of_parts;
